Перейти к основному содержимому

7.04. Справочник по Logstash

Разработчику Архитектору Инженеру

Справочник по Logstash

Logstash — это серверная программа с открытым исходным кодом для сбора, обработки и пересылки событий и журналов. Она принимает данные из множества источников, преобразует их в соответствии с заданными правилами и отправляет в различные системы назначения, такие как Elasticsearch, файлы, базы данных или очереди сообщений.


Архитектура Logstash

Logstash работает по конвейерной модели, состоящей из трёх основных компонентов:

  • Input — получает данные из внешних источников.
  • Filter — преобразует и обогащает данные.
  • Output — отправляет обработанные данные в целевые системы.

Каждый компонент реализуется через плагины. Logstash поставляется с сотнями встроенных плагинов и поддерживает пользовательские расширения.


Input-плагины

Input-плагины определяют источники данных. Ниже приведены наиболее часто используемые.

file

Читает события из файлов, аналогично команде tail -F.

Основные параметры:

  • path — путь к файлу или шаблон (например, /var/log/*.log).
  • start_position — позиция начала чтения (beginning или end).
  • sincedb_path — путь к файлу, хранящему смещение последнего прочитанного байта.
  • codec — кодек для разбора содержимого (по умолчанию plain).
  • type — добавляет поле type к событию.
  • tags — массив тегов, добавляемых к событию.

Пример:

input {
file {
path => "/var/log/app.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "application_log"
}
}

beats

Получает данные от Filebeat, Metricbeat и других Beats.

Основные параметры:

  • port — порт прослушивания (например, 5044).
  • host — адрес интерфейса для привязки ("0.0.0.0" для всех).
  • ssl — включает TLS/SSL.
  • ssl_certificate, ssl_key — пути к сертификату и ключу при использовании SSL.

Пример:

input {
beats {
port => 5044
}
}

http

Принимает события через HTTP(S) POST-запросы.

Основные параметры:

  • port — порт HTTP-сервера.
  • host — хост для привязки.
  • ssl — включает HTTPS.
  • response_headers — заголовки ответа клиенту.
  • codec — кодек для разбора тела запроса (json, plain, line и др.).

Пример:

input {
http {
port => 8080
codec => json
}
}

stdin

Читает события из стандартного ввода. Полезен для тестирования.

Параметры:

  • codec — кодек разбора (line, json_lines и др.).

Пример:

input {
stdin {
codec => json_lines
}
}

tcp / udp

Принимает события по протоколам TCP или UDP.

Общие параметры:

  • port — порт прослушивания.
  • host — хост привязки.
  • codec — кодек (line, json, netflow и др.).
  • type — тип события.
  • Для TCP: ssl_enable, ssl_cert, ssl_key.

Пример TCP:

input {
tcp {
port => 9999
codec => json
}
}

Filter-плагины

Filter-плагины применяются для преобразования событий. Порядок фильтров в конфигурации важен.

grok

Разбирает неструктурированный текст с помощью регулярных выражений и шаблонов.

Основные параметры:

  • match — сопоставление поля с шаблоном (например, "message" => "%{COMBINEDAPACHELOG}").
  • patterns_dir — каталог с пользовательскими шаблонами.
  • break_on_match — прекращать ли обработку после первого совпадения.
  • tag_on_failure — тег, добавляемый при неудаче (по умолчанию _grokparsefailure).

Встроенные шаблоны: IP, TIMESTAMP_ISO8601, NUMBER, WORD, DATA, GREEDYDATA и сотни других.

Пример:

filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
}
}

date

Преобразует строковое представление времени в поле @timestamp.

Основные параметры:

  • match — массив из имени поля и формата даты (например, ["log_timestamp", "yyyy-MM-dd HH:mm:ss"]).
  • timezone — часовой пояс (например, "Europe/Moscow").
  • target — целевое поле (по умолчанию @timestamp).

Пример:

filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
timezone => "UTC"
}
}

mutate

Выполняет базовые операции над полями: переименование, удаление, преобразование типов, объединение и разделение.

Основные действия:

  • rename — переименовывает поле.
  • remove_field — удаляет поле.
  • convert — преобразует тип (string, integer, float, boolean).
  • add_field — добавляет новое поле.
  • split — разделяет строку по разделителю.
  • join — объединяет массив в строку.
  • strip — удаляет пробелы по краям.
  • gsub — замена подстрок по регулярному выражению.

Пример:

filter {
mutate {
convert => { "bytes" => "integer" }
rename => { "host_ip" => "client_ip" }
remove_field => ["temp_field"]
}
}

json

Разбирает JSON-строку из указанного поля и вставляет результат как подобъект.

Основные параметры:

  • source — имя поля с JSON-строкой.
  • target — имя поля, куда поместить результат (по умолчанию корень события).
  • skip_on_invalid_json — пропускать ли событие при ошибке разбора.

Пример:

filter {
json {
source => "message"
target => "parsed"
}
}

kv (key-value)

Извлекает пары ключ=значение из строки (часто используется для логов вида user=admin action=login).

Основные параметры:

  • source — поле для разбора.
  • field_split — символ разделения полей (по умолчанию &).
  • value_split — символ разделения ключа и значения (по умолчанию =).
  • target — целевое поле.
  • include_keys, exclude_keys — фильтрация ключей.

Пример:

filter {
kv {
source => "message"
field_split => " "
value_split => "="
}
}

geoip

Добавляет географическую информацию на основе IP-адреса.

Основные параметры:

  • source — поле с IP-адресом.
  • target — поле для сохранения результата (по умолчанию geoip).
  • database — путь к MaxMind GeoLite2 базе (.mmdb).
  • fields — список полей для извлечения (city_name, country_code2, location и др.).

Пример:

filter {
geoip {
source => "client_ip"
database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
fields => ["city_name", "country_code2", "location"]
}
}

useragent

Разбирает строку User-Agent и извлекает информацию о браузере, ОС и устройстве.

Основные параметры:

  • source — поле со строкой User-Agent.
  • target — целевое поле (по умолчанию user_agent).
  • regexes — пользовательские регулярные выражения.

Пример:

filter {
useragent {
source => "agent"
target => "browser"
}
}

drop

Отбрасывает событие целиком. Используется в условных блоках.

Параметры:

  • percentage — процент событий для отбрасывания (для выборочной фильтрации).

Пример:

filter {
if [level] == "debug" {
drop {}
}
}

clone

Создаёт копию события. Полезно для отправки одного события в несколько выходов с разными преобразованиями.

Параметры:

  • clones — массив имён клонов.
  • add_tag — теги для добавления к клонам.

Пример:

filter {
clone {
clones => ["archive_copy"]
add_tag => ["archived"]
}
}

Output-плагины

Output-плагины отправляют обработанные события в целевые системы.

elasticsearch

Отправляет события в Elasticsearch.

Основные параметры:

  • hosts — массив хостов Elasticsearch (["http://localhost:9200"]).
  • index — шаблон имени индекса (например, "logs-%{+YYYY.MM.dd}").
  • document_id — уникальный идентификатор документа.
  • user, password — учётные данные при включённой аутентификации.
  • ssl_certificate_verification — проверка SSL-сертификата.
  • ilm_enabled — использование Lifecycle Management.
  • pipeline — имя ingest pipeline в Elasticsearch.

Пример:

output {
elasticsearch {
hosts => ["http://es-cluster:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "changeme"
}
}

file

Записывает события в файл.

Основные параметры:

  • path — путь к файлу.
  • codec — кодек сериализации (json_lines, plain, rubydebug).
  • write_behavior — поведение записи (overwrite, append).
  • flush_interval — интервал сброса буфера (в секундах).

Пример:

output {
file {
path => "/var/log/processed/%{type}.log"
codec => json_lines
}
}

stdout

Выводит события в стандартный поток вывода. Используется для отладки.

Параметры:

  • codec — кодек (rubydebug, json, dots).

Пример:

output {
stdout {
codec => rubydebug
}
}

http

Отправляет события через HTTP(S) POST или PUT.

Основные параметры:

  • url — URL конечной точки.
  • http_method — метод (post, put).
  • format — формат тела (json, form).
  • headers — заголовки запроса.
  • user, password — базовая аутентификация.
  • ssl_certificate_validation — проверка SSL.

Пример:

output {
http {
url => "https://api.example.com/events"
http_method => "post"
format => "json"
headers => { "Authorization" => "Bearer xyz" }
}
}

kafka

Отправляет события в Apache Kafka.

Основные параметры:

  • bootstrap_servers — список брокеров ("host1:9092,host2:9092").
  • topic_id — тема Kafka.
  • key — ключ сообщения (для партиционирования).
  • compression_type — тип сжатия (none, gzip, snappy, lz4).
  • ssl — настройки SSL/TLS.
  • sasl — настройки SASL аутентификации.

Пример:

output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "logstash-events"
compression_type => "snappy"
}
}

null

Отбрасывает события без вывода. Используется для тестирования производительности.

Пример:

output {
null {}
}

Кодеки (Codecs)

Кодеки определяют, как данные читаются из источника или записываются в назначение. Они работают внутри input- и output-плагинов и отвечают за сериализацию и десериализацию событий.

plain

Простейший кодек: каждая строка рассматривается как отдельное событие. Используется по умолчанию во многих input-плагинах.

Параметры:

  • charset — кодировка (например, "UTF-8").

json

Разбирает строку как JSON-объект и создаёт событие на его основе.

Параметры:

  • charset — кодировка.
  • target — поле, куда поместить результат (по умолчанию корень события).

json_lines

Обрабатывает поток JSON-объектов, каждый из которых занимает одну строку (newline-delimited JSON).

Параметры:

  • charset — кодировка.

multiline

Объединяет несколько строк в одно событие по заданному шаблону. Часто используется для логов с многострочными стеками ошибок.

Основные параметры:

  • pattern — регулярное выражение, определяющее начало нового события.
  • negate — инвертировать условие (true означает, что строки, не соответствующие шаблону, присоединяются к предыдущей).
  • what — указывает, к какой части относится совпадение (previous или next).
  • max_lines — максимальное количество строк в одном событии.
  • max_bytes — максимальный размер буфера в байтах.

Пример для Java-стеков:

codec => multiline {
pattern => "^\s"
what => "previous"
negate => true
}

rubydebug

Форматирует событие в человекочитаемый вид с цветами и отступами (только для вывода). Аналог stdout { codec => rubydebug }.

line

Читает входной поток построчно. Похож на plain, но позволяет явно указать разделитель строк.

Параметры:

  • delimiter — символ окончания строки (по умолчанию \n).

Условные конструкции

Logstash поддерживает условные выражения (if, else if, else) для управления потоком обработки.

Синтаксис:

if [field] == "value" {
mutate { ... }
} else if [status] =~ /^5\d\d$/ {
drop {}
} else {
# действие по умолчанию
}

Операторы сравнения:

  • ==, !=, <, >, <=, >=
  • =~ — совпадение с регулярным выражением
  • !~ — отсутствие совпадения
  • in — проверка вхождения (для массивов и строк)
  • not in — отрицание вхождения

Логические операторы:

  • and, or, not

Примеры:

if "error" in [message] {
mutate { add_tag => ["error_log"] }
}

if [log_level] == "DEBUG" and [environment] == "production" {
drop {}
}

Поля события

Каждое событие в Logstash — это документ с полями. Поля могут быть скалярами (строка, число, булево), массивами или вложенными объектами.

Доступ к полям:

  • [field] — простое поле
  • [nested][field] — вложенное поле
  • [array][0] — первый элемент массива

Специальные поля:

  • @timestamp — временная метка события (автоматически добавляется при создании события)
  • @version — версия формата события (обычно "1")
  • host — имя хоста, с которого пришло событие (если источник предоставляет)
  • message — исходное содержимое события

Настройки производительности и масштабирования

Pipeline Workers

Logstash использует несколько рабочих потоков для параллельной обработки событий.

Параметр:

  • -w N или pipeline.workers: N в logstash.yml — количество рабочих потоков (по умолчанию равно числу ядер CPU).

Рекомендация: не увеличивать сверх числа ядер без профилирования — может вызвать конкуренцию за ресурсы.

Batch Size

Количество событий, обрабатываемых одним рабочим за раз.

Параметр:

  • pipeline.batch.size (по умолчанию 125)

Увеличение размера пакета повышает пропускную способность, но увеличивает задержку.

Batch Delay

Максимальное время ожидания заполнения пакета перед обработкой.

Параметр:

  • pipeline.batch.delay (в миллисекундах, по умолчанию 50)

Queue

Logstash поддерживает две очереди:

  • In-memory queue — по умолчанию, быстрая, но теряет данные при аварийном завершении.
  • Persistent queue — на диске, обеспечивает отказоустойчивость.

Настройки persistent queue (logstash.yml):

queue.type: persisted
path.queue: "/var/lib/logstash/queue"
queue.page_capacity: 250mb
queue.max_events: 0
queue.max_bytes: 10gb

Мониторинг Logstash

Logstash предоставляет метрики через HTTP API и может интегрироваться с мониторинговыми системами.

Встроенный HTTP API

Включается в logstash.yml:

http.host: "127.0.0.1"
http.port: 9600

Эндпоинты:

  • GET / — общая информация
  • GET /_node/pipeline — состояние pipeline
  • GET /_node/stats — статистика по входам, фильтрам, выходам

Метрики

Включают:

  • количество обработанных событий
  • скорость обработки (events/sec)
  • задержки
  • ошибки

Можно отправлять метрики в Elasticsearch, Prometheus (через плагин prometheus_exporter) или StatsD.


CLI-параметры запуска

Основные флаги командной строки:

  • -f CONFIG_FILE — путь к файлу конфигурации
  • -e CONFIG_STRING — конфигурация в виде строки
  • -l LOG_DIR — каталог для логов
  • -w WORKERS — количество рабочих потоков
  • --config.reload.automatic — автоматическая перезагрузка при изменении конфигурации
  • --config.reload.interval=3s — интервал проверки изменений
  • --quiet, --verbose, --debug — уровни логирования

Пример запуска:

bin/logstash -f /etc/logstash/conf.d/app.conf --config.reload.automatic

Примеры полных конфигураций

1. Обработка Apache-логов

input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "apache_access"
}
}

filter {
if [type] == "apache_access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
geoip {
source => "clientip"
}
useragent {
source => "agent"
target => "user_agent"
}
mutate {
remove_field => ["message", "timestamp"]
}
}
}

output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "apache-logs-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}

2. Приём JSON через HTTP и отправка в Kafka

input {
http {
port => 8080
codec => json
}
}

filter {
mutate {
add_field => { "received_at" => "%{@timestamp}" }
}
}

output {
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "incoming-events"
compression_type => "snappy"
}
}

3. Фильтрация и клонирование для архива

input {
beats {
port => 5044
}
}

filter {
if [log][level] == "error" {
clone {
clones => ["archive"]
add_tag => ["archived_error"]
}
}
}

output {
if "archived_error" in [tags] {
file {
path => "/var/log/errors-archive/%{+YYYY-MM-dd}.log"
codec => json_lines
}
}

elasticsearch {
hosts => ["http://es:9200"]
index => "logs-%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}

Расширенные фильтры и специализированные плагины

csv

Разбирает строки в формате CSV (значения, разделённые запятыми или другим символом).

Основные параметры:

  • separator — символ-разделитель (по умолчанию ,)
  • columns — массив имён колонок, которые будут использоваться как ключи полей
  • source — поле, содержащее строку CSV
  • target — поле, в которое помещаются результаты (по умолчанию корень события)
  • quote_char — символ кавычек для экранирования значений (по умолчанию ")

Пример:

filter {
csv {
separator => ";"
columns => ["timestamp", "user_id", "action", "status"]
source => "message"
}
}

urldecode

Декодирует URL-кодированные строки (например, %D0%9F%D1%80%D0%B8%D0%B2%D0%B5%D1%82Привет).

Параметры:

  • field — имя поля для декодирования
  • target — целевое поле (если не указано, заменяет исходное)

Пример:

filter {
urldecode {
field => "query_string"
}
}

split

Разделяет одно событие на несколько по указанному полю (обычно массиву).

Параметры:

  • field — поле-массив, по которому выполняется разделение
  • terminator — если поле — строка, используется как разделитель

Пример:

# Исходное событие: { "tags": ["a", "b", "c"] }
filter {
split { field => "tags" }
}
# Результат: три события с tags = "a", "b", "c"

fingerprint

Создаёт уникальный хеш на основе указанных полей. Используется для дедупликации или анонимизации.

Параметры:

  • source — поле или массив полей для хеширования
  • target — имя нового поля для хеша
  • method — алгоритм (MD5, SHA1, SHA256, MURMUR3)
  • key — секретный ключ для HMAC (опционально)

Пример:

filter {
fingerprint {
source => ["client_ip", "user_agent"]
target => "session_id"
method => "MURMUR3"
}
}

aggregate

Группирует события по ключу и накапливает данные из нескольких событий в одно. Полезен для сбора связанных записей (например, начало и завершение транзакции).

Параметры:

  • task_id — выражение для формирования ключа группировки
  • code — Ruby-код для агрегации
  • push_previous_event — отправлять ли предыдущее событие при новом ключе
  • timeout — время ожидания завершения группы

Пример (накопление всех строк лога под одним ID):

filter {
aggregate {
task_id => "%{transaction_id}"
code => "
map['lines'] ||= []
map['lines'] << event.get('message')
event.cancel()
"
push_map_as_event_on_timeout => true
timeout_task_id_field => "transaction_id"
timeout => 30
}
}

Безопасность и управление доступом

TLS/SSL в input и output

Logstash поддерживает шифрование трафика через TLS во многих плагинах.

Общие параметры для SSL:

  • ssl_enable — включить SSL
  • ssl_verify — проверять сертификат клиента/сервера
  • ssl_cert — путь к сертификату
  • ssl_key — путь к приватному ключу
  • ssl_key_passphrase — пароль к ключу
  • ssl_ca_cert — путь к CA-сертификату

Пример безопасного HTTP-входа:

input {
http {
port => 8443
ssl => true
ssl_certificate => "/etc/logstash/certs/server.crt"
ssl_key => "/etc/logstash/certs/server.key"
}
}

Аутентификация

  • В elasticsearch output: параметры user и password
  • В http output/input: заголовки Authorization, базовая аутентификация
  • В Kafka: SASL/SCRAM, Kerberos (через sasl_jaas_config, sasl_mechanism)

Управление конфигурациями

Организация файлов

Рекомендуется разделять конфигурацию на части:

  • 01-inputs.conf
  • 10-filters-common.conf
  • 20-filters-app.conf
  • 99-outputs.conf

Logstash загружает все .conf файлы из указанной директории в лексикографическом порядке.

Переменные окружения

Можно использовать переменные окружения в конфигурации:

input {
elasticsearch {
hosts => "${ES_HOSTS:localhost:9200}"
user => "${ES_USER:elastic}"
password => "${ES_PASSWORD}"
}
}

Если переменная не задана, используется значение после двоеточия (по умолчанию).

Автоматическая перезагрузка

Включается флагом --config.reload.automatic. Logstash отслеживает изменения в файлах и перезагружает pipeline без остановки.

Важно: ошибки в новой конфигурации не приводят к остановке — старая версия продолжает работать.


Обработка ошибок и отладка

Теги ошибок

Некоторые плагины автоматически добавляют теги при ошибках:

  • _grokparsefailure
  • _jsonparsefailure
  • _dateparsefailure

Можно использовать их для маршрутизации проблемных событий:

output {
if "_grokparsefailure" in [tags] {
file { path => "/var/log/logstash/grok_failures.log" }
} else {
elasticsearch { ... }
}
}

Логирование

Уровни логов: ERROR, WARN, INFO, DEBUG.

Файлы логов по умолчанию: logs/logstash-plain.log, logs/logstash-json.log.

Можно настроить в config/log4j2.properties.

Dry-run и проверка синтаксиса

Команда для проверки конфигурации без запуска:

bin/logstash --config.test_and_exit -f my.conf

Интеграция с Beats

Beats (Filebeat, Metricbeat и др.) являются лёгкими агентами для сбора данных и отправки в Logstash.

Рекомендуемая схема:

  • Filebeat читает логи → отправляет в Logstash через протокол Lumberjack
  • Logstash обрабатывает → отправляет в Elasticsearch

Преимущества:

  • Filebeat гарантирует доставку (подтверждение получения)
  • Минимальное потребление ресурсов на хосте-источнике
  • Поддержка модулей (например, filebeat modules enable nginx)

Конфигурация Filebeat для Logstash:

output.logstash:
hosts: ["logstash-host:5044"]

Пользовательские плагины

Logstash позволяет создавать собственные плагины на Ruby.

Типы плагинов:

  • Input
  • Filter
  • Output
  • Codec

Структура плагина:

  • Наследуется от базового класса (LogStash::Inputs::Base и др.)
  • Реализует методы register, run, close
  • Упаковывается как gem

Установка:

bin/logstash-plugin install /path/to/my-plugin.gem